package com.atguigu.flink.chapter11.time;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lzc
 * @Date 2022/11/1 08:48
 */
public class Flink_01_Time_Process {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("s1", 1L, 10),
            new WaterSensor("s2", 1L, 10),
            new WaterSensor("s1", 2L, 20),
            new WaterSensor("s1", 3L, 30),
            new WaterSensor("s1", 4L, 40),
            new WaterSensor("s1", 5L, 50)
        );
        
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        // 1. 在流转成表的时候添加时间属性: 如果要增加字段, 需要把这张表所有的字段都要声明一下
        Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("pt").proctime());
        table.printSchema();
        table.execute().print();
        
        // 2. 在 ddl sql 中
    }
}
